/*
 * ThreadPool.cpp
 *
 *  Created on: 2013年7月22日
 *      Author: sun
 */

#include "ThreadPool.h"
#include "../common/Log.h"
#include "../common/common_define.h"
#include "../common/global.h"
#include <errno.h>
#include <stdlib.h>
#include <fcntl.h>
#include <netdb.h>

namespace sdfs {

void *pool_thread_entrance(void *arg)
{
	Log::Debug("pool_thread_entrance");
	Log::Debug("thread: 0x%x entered", pthread_self());
	ThreadConfig *targ = (ThreadConfig *)arg;
	targ->bIsClosed = false;
	IRunnable *runner = targ->runner;
	void *result = NULL;
	int err = 0;
	Task* task;
//	ThreadPool *pPool = (ThreadPool *)targ->arg.ptr;
	while(targ->bIsContinue)
	{
		Log::Debug("thread: 0x%x is getting a task...", pthread_self());
		task = TaskQueue::GetInstance()->GetTask();
		if(task == NULL)
		{
			Log::Debug("task is NULL");
			continue;
		}
		Log::Debug("thread:0x%x is working, task fd: %d...", pthread_self(), task->GetDescriptor());
		result = runner->Run(task);
		Log::Debug("result: %d", (PTR_LEN)result);
		if((PTR_LEN)result == ECLOSED)
		{
			g_client_epoll.DeleteEvent(task->GetDescriptor());
		}
		task->Destroy();
		Log::Debug("thread:0x%x task end", pthread_self());
	}
	Log::Debug("thread: 0x%x exited", pthread_self());
	targ->bIsClosed = true;
	pthread_exit(NULL);
}

ThreadPool::ThreadPool(IRunnable &runner, int minsize, int maxsize, int capacity) {
	m_runner = &runner;
	m_nMinSize = minsize;
	m_nCurSize = minsize;
	m_nMaxSize = maxsize;
	m_nGroupCapacity = capacity;
	m_pThreads = new std::vector<Thread*>*[m_nMaxSize];

	for(int i = 0 ; i < m_nMaxSize; ++i)
	{
		m_pThreads[i] = new vector<Thread*>;
		for(int j = 0 ; j < m_nGroupCapacity ; ++j)
		{
			m_pThreads[i]->push_back(new Thread(&runner, this, false, pool_thread_entrance));
		}
	}
	Log::Debug("file: "__FILE__", line %d", __LINE__);
	m_pfds = new std::vector<int>;
	m_epoll = new Epoll();
	m_epoll->AddEvent(m_pipe.GetReadDescriptor(), EPOLLIN | EPOLLET);
	m_bContinue = true;
	Log::Debug("file: "__FILE__", line %d", __LINE__);
	for(int i = 0 ; i < m_nMinSize ; ++i)
	{
		for(std::vector<Thread*>::iterator it = m_pThreads[i]->begin(); it != m_pThreads[i]->end(); ++it)
		{
			int result = (*it)->Start();
			if(result != 0)
			{
				Log::Error("file:"__FILE__", line: %d, "
						"failed to call Start, errno: %d, "
						"info: %s", __LINE__, result, STRERROR(result));
				m_pThreads[i]->erase(it);
				m_pThreads[i]->push_back(new Thread(&runner, this, false, pool_thread_entrance));
				continue;
			}
		}
	}
	Log::Debug("file: "__FILE__", line %d", __LINE__);
}

int ThreadPool::addTask(int sockfd)
{
	int result =  write(m_pipe.GetWriteDescriptor(), &sockfd, sizeof(int));
	if(result <= 0)
	{
		Log::Error("file:"__FILE__", line: %d, "
				"failed to call write, errno: %d, "
				"info: %s", __LINE__, errno, STRERROR(errno));
	}
	return result;
}


void* ThreadPool::Run(void *arg)
{
	epoll_event events[EPOLL_MAX_SIZE];
	int result;
	int sockfd;
	unsigned long count = 0L;
	while(m_bContinue){
		//Log::Debug("Task Thread is waiting...");
		int nfds = m_epoll->Wait(events, EPOLL_MAX_SIZE);
		if(nfds <= 0)
		{
			continue;
		}
		for(int i = 0 ; i < nfds ; ++i)
		{
			int readfd = events[i].data.fd;
			if(readfd == m_pipe.GetReadDescriptor())
			{
				result = read(readfd, &sockfd, sizeof(int));
				if(result < 0)
				{
					Log::Error("file:"__FILE__", line: %d, "
						"failed to call epoll_ctl, errno: %d, "
						"info: %s", __LINE__, result, STRERROR(result));
					continue;
				}
				Log::Debug("New Task from: %d", sockfd);
				Task *pTask = Task::CreateTask(sockfd);
				TaskQueue::GetInstance()->Add(pTask);
				bool flag = false;
				for(vector<int>::iterator it = m_pfds->begin() ; it != m_pfds->end(); ++it)
				{
					if(*it == sockfd)
					{
						flag = true;
						break;
					}
				}
				if(!flag)
					m_pfds->push_back(sockfd);
			}
		}
	}
	pthread_exit(NULL);
}

void ThreadPool::RemoveFromEpoll(int fd)
{
	epoll_event event;
	int result = m_epoll->DeleteEvent(fd);
	if(result != 0)
	{
		Log::Debug("RemoveFromEpoll error, errno: %d, info: %s", errno, STRERROR(errno));
	}
	else
	{
		Log::Debug("RemoveFromEpoll success, fd:%d", fd);
		shutdown(fd, SHUT_RDWR);
		close(fd);
	}
	vector<int>::iterator it;
	for(it = m_pfds->begin() ; it != m_pfds->end(); ++it)
	{
		if(*it == fd)
		{
			m_pfds->erase(it);
		}
	}
}

void ThreadPool::Stop()
{
	Log::Debug("ThreadPool: Stop");
	m_bContinue = false;
	Log::Debug("ThreadPool: remove event");
	int result = m_epoll->DeleteEvent( m_pipe.GetReadDescriptor());
	if(result != 0)
	{
		Log::Error("file:"__FILE__", line: %d, "
			"failed to call epoll_ctl, errno: %d, "
			"info: %s", __LINE__, errno, STRERROR(errno));
	}
	//need a map to record connected socket descriptor
	for(vector<int>::iterator it = m_pfds->begin() ; it != m_pfds->end() ; ++it)
	{
		result = m_epoll->DeleteEvent(*it);
		if(result != 0)
		{
			Log::Error("file:"__FILE__", line: %d, "
				"failed to call epoll_ctl, errno: %d, "
				"info: %s", __LINE__, errno, STRERROR(errno));
		}
	}
	Log::Debug("ThreadPool: stop thread pool");
	for(int i = 0 ; i < m_nCurSize ; ++i)
	{
		Log::Debug("ThreadPool: stopping thread: %d", i);
		for(int j = 0 ; j < m_nGroupCapacity; ++j)
			m_pThreads[i]->at(j)->Stop();
	}
	result = TaskQueue::GetInstance()->StopWaiting();
	if(result != 0)
	{
		Log::Error("file:"__FILE__", line: %d, "
			"failed to call StopWaiting, errno: %d, "
			"info: %s", __LINE__, result, STRERROR(result));
	}
	Log::Debug("ThreadPool: Stop all");

}

int ThreadPool::TaskSize()
{
	return TaskQueue::GetInstance()->GetSize();
}

void ThreadPool::StartNewThreadGroup()
{
	if(m_nCurSize >= m_nMaxSize)
		return;
	for(vector<Thread*>::iterator it = m_pThreads[m_nCurSize]->begin() ;
			it != m_pThreads[m_nCurSize]->end(); ++it )
	{
		(*it)->Start();
	}
	m_nCurSize++;
}

void ThreadPool::StopWorkingThreadGroup()
{
	if(m_nCurSize <= m_nMinSize)
		return;
	m_nCurSize--;
	for(vector<Thread*>::iterator it = m_pThreads[m_nCurSize]->begin() ;
				it != m_pThreads[m_nCurSize]->end(); ++it )
	{
		(*it)->Stop();
	}
}

ThreadPool::~ThreadPool() {
	Log::Debug("~ThreadPool()");

	for(int i = 0 ; i < m_nMaxSize ; ++i)
	{
		for(vector<Thread*>::iterator it = m_pThreads[i]->begin(); it != m_pThreads[i]->end(); ++it)
		{
			delete *it;
		}
		delete m_pThreads[i];
	}
	delete []m_pThreads;
	delete m_pfds;
}

} /* namespace sdfs */

